package com.dec.kafka.offset.reset;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class KafkaOffsetResetUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOffsetReset.class);

    private static String bootstrap_servers = "kafka001:6667,kafka002:6667,kafka003:6667,kafka004:6667,kafka005:6667";
//    private static String bootstrap_servers = "bigdata.lkl.com:9092, bigdata.lkl.com:9093, bigdata.lkl.com:9094";
    private static String offset_reset_config = "earliest";

    public static void main(String[] args) throws InterruptedException {

        int repair = Integer.valueOf(args[0]);

        List<String[]> list = new ArrayList<String[]>();

        if(repair==0){ //修复HDFS
            String[] hdfs1 = new String[]{"rds-rt-dealed-t00101", "c1_cd_g_rt_hdfs_t00101", "0", "701734"};
            String[] hdfs2 = new String[]{"rds-rt-dealed-t00102", "c1_cd_g_rt_hdfs_t00102", "0", "626597"};
            String[] hdfs3 = new String[]{"rds-rt-dealed-t00202","c1_cd_g_rt_hdfs_t00202","0","7966"};

            list.add(hdfs1);
            list.add(hdfs2);
            list.add(hdfs3);
        }else if(repair==1){ // 修复TSDB
            String[] tsdb1 = new String[]{"rds-rt-dealed-t00101", "c1_g_rt_tsdb_t00101", "0", "651451"};
            String[] tsdb2 = new String[]{"rds-rt-dealed-t00102", "c1_g_rt_tsdb_t00102", "0", "576308"};
            String[] tsdb3 = new String[]{"rds-rt-dealed-t00201","c1_g_rt_tsdb_t00202","0","0"};

            list.add(tsdb1);
            list.add(tsdb2);
            list.add(tsdb3);
        }

        for (int i = 0; i < list.size(); i++) {
            String[] str = list.get(i);
            String topic = str[0];
            String groupId = str[1];
            int partition = Integer.parseInt(str[2]);
            int offset = Integer.parseInt(str[3]);
            KafkaConsumer<String, String> consumer = initConsumer(offset_reset_config, groupId, bootstrap_servers);
            reset(consumer, topic, groupId, partition, offset);
        }
    }

    public static void reset(KafkaConsumer<String, String> consumer,
                             String topic,
                             String group_id,
                             int partition,
                             int offset) throws InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition tp = new TopicPartition(topic, partition);
        OffsetAndMetadata om = new OffsetAndMetadata(offset, group_id);
        offsetAndMetadataMap.put(tp, om);
        consumer.commitSync(offsetAndMetadataMap);
    }

    public static KafkaConsumer<String, String> initConsumer(String offset_reset_config,
                                                             String group_id,
                                                             String bootstrap_servers) {
        Properties config = new Properties();
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset_reset_config);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);

        return consumer;
    }

//    public static void consume( KafkaConsumer<String, String> consumer,String topic) throws InterruptedException {
//        consumer.subscribe(Arrays.asList(topic));
//        ConsumerRecords<String, String> records;
//        Gson gson = new Gson();
//        while (true) {
//            records = consumer.poll(1000);
//            for (ConsumerRecord<String, String> record : records) {
//                String value = record.value();
//                KKSEntity kks = gson.fromJson(value, KKSEntity.class);
//                long data_time = kks.getCollect_time();
//                LOGGER.warn("当前偏移量：" + record.offset() + "，寻找偏移量->" + DateFormatUtils.format(data_time * 1000, "yyyy-MM-dd HH:mm:ss"));

//                if (Long.valueOf(data_time * 1000).compareTo(start_date_time) == 1) {
//                    LOGGER.error("找到起始偏移量：" + record.offset() + "，当前数据时间：" + kks.getCollect_time() + "==>" + DateFormatUtils.format(data_time * 1000, "yyyy-MM-dd HH:mm:ss"));
//                    offset = (int) record.offset();
//                    reset();
//                    System.exit(1);
//                } else {
//                    LOGGER.warn("当前偏移量：" + record.offset() + "，寻找偏移量->" + DateFormatUtils.format(data_time * 1000, "yyyy-MM-dd HH:mm:ss"));
//                }
//                consumer.commitSync();
//            }
//            Thread.sleep(2000);
//        }
//
//    }
}
